/**
 * Copyright (c) 2015-2017, Henry Yang 杨勇 (gismail@foxmail.com).
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lambkit.plugin.redis;

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.nosql.redis.RedisDS;
import com.lambkit.core.Lambkit;
import com.lambkit.core.cache.BaseCache;
import com.lambkit.plugin.redis.serializer.ProtostuffSerializer;

import java.util.*;

public class RedisCacheImpl extends BaseCache {

    private RedisCache redis;
    private Map<String, Integer> cacheNames = MapUtil.newConcurrentHashMap();

    public RedisCacheImpl() {
        RedisDS reidsDS = Lambkit.context().getAttr("redisDS");
        if(reidsDS != null) {
            redis = new RedisCache(reidsDS, new ProtostuffSerializer());
        }
    }

    public RedisCache redis() {
        if(redis==null) {
            RedisDS reidsDS = Lambkit.context().getAttr("redisDS");
            if(reidsDS != null) {
                redis = new RedisCache(reidsDS, new ProtostuffSerializer());
            }
        }
        return redis;
    }

    @Override
    public String getCacheType() {
        return "redis";
    }

    @Override
    public <T> T get(String cacheName, Object key) {
        return redis().get(buildKey(cacheName, key));
    }

    @Override
    public void put(String cacheName, Object key, Object value) {
        if (value == null) {
            // if value is null : java.lang.NullPointerException: null at redis.clients.jedis.Protocol.sendCommand(Protocol.java:99)
            return;
        }
        putCacheName(cacheName);
        redis().set(buildKey(cacheName, key), value);
    }

    @Override
    public Long size(String cacheName) {
        return Long.valueOf(redis().keys(cacheName + ":*").size());
    }

    public void putCacheName(String cacheName) {
        Integer num = cacheNames.get(cacheName);
        num = num == null ? 1 : num + 1;
        cacheNames.put(cacheName, num);
    }

    @Override
    public String[] getCacheNames() {
        Set<String> keyset = cacheNames.keySet();
        return keyset.toArray(new String[keyset.size()]);
    }

    @Override
    public List getKeys(String cacheName) {
        List<String> keys = new ArrayList<String>();
        keys.addAll(redis().keys(cacheName + ":*"));
        for (int i = 0; i < keys.size(); i++) {
            keys.set(i, keys.get(i).substring(cacheName.length() + 3));
        }
        return keys;
    }

    @Override
    public List<String> getKeys(String cacheName, String keySearch) {
        Collection<? extends String> keys = redis.keys(cacheName + ":*" + keySearch);
        List<String> list = new ArrayList<>(keys);
        return list;
    }

    @Override
    public Collection getValues(String cacheName) {

        List<Object> values = new ArrayList<Object>();
        for (String key : redis().keys(cacheName + ":*")) {
            values.add(redis().get(key));
        }
        return values;
    }

    @Override
    public Map getAll(String cacheName) {

        Map<String, Object> map = new HashMap<>();
        for (String key : redis().keys(cacheName + ":*")) {
            map.put(key.substring(cacheName.length() + 3), redis().get(key));
        }
        return map;
    }

    @Override
    public void remove(String cacheName, Object key) {
        redis().del(buildKey(cacheName, key));
    }


    @Override
    public void removeAll(String cacheName) {
        String[] keys = new String[]{};
        keys = redis().keys(cacheName + ":*").toArray(keys);
        redis().del(keys);
    }

    private Object buildKey(String cacheName, Object key) {
        if(StrUtil.isBlank(cacheName)) {
            return key;
        }
        if (key instanceof Number) {
            return String.format("%s:I:%s", cacheName, key);
        } else {
            Class keyClass = key.getClass();
            if (String.class.equals(keyClass) ||
                    StringBuffer.class.equals(keyClass) ||
                    StringBuilder.class.equals(keyClass)) {
                return String.format("%s:S:%s", cacheName, key);
            }
        }
        return String.format("%s:O:%s", cacheName, key);
    }


    @Override
    public void put(String cacheName, Object key, Object value, int liveSeconds) {

        if (value == null) {
            // if value is null : java.lang.NullPointerException: null at redis.clients.jedis.Protocol.sendCommand(Protocol.java:99)
            return;
        }
        if (liveSeconds <= 0) {
            put(cacheName, key, value);
            return;
        }
        putCacheName(cacheName);
        redis().setex(buildKey(cacheName, key), liveSeconds, value);
    }

    public void setex(String cacheName, Object key, Object value, int liveSeconds) {

        if (value == null) {
            // if value is null : java.lang.NullPointerException: null at redis.clients.jedis.Protocol.sendCommand(Protocol.java:99)
            return;
        }
        if (liveSeconds <= 0) {
            put(cacheName, key, value);
            return;
        }
        putCacheName(cacheName);
        redis().setex(buildKey(cacheName, key), liveSeconds, value);
    }


    @Override
    public Long expire(String cacheName, Object key, int seconds) {

        return redis().expire(buildKey(cacheName, key), seconds);
    }

    @Override
    public long getExpire(String cacheName, String key) {
        return redis().getJedis().ttl(redis().keyToBytes(buildKey(cacheName, key)));
    }

    @Override
    public void lpush(String cacheName, Object key, Object... values) {

        putCacheName(cacheName);
        redis().lpush(buildKey(cacheName, key), values);
    }


    @Override
    public Long llen(String cacheName, Object key) {

        return redis().llen(buildKey(cacheName, key));
    }


    @Override
    public void lrem(String cacheName, Object key, int count, Object value) {

        redis().lrem(buildKey(cacheName, key), count, value);
    }


    @Override
    public List lrange(String cacheName, Object key, int start, int end) {

        return redis().lrange(buildKey(cacheName, key), start, end);
    }

    @Override
    public void srem(String cacheName, Object key, Object... members) {

        redis().srem(buildKey(cacheName, key), members);
    }

    @Override
    public Set smembers(String cacheName, Object key) {

        return redis().smembers(buildKey(cacheName, key));
    }


    @Override
    public Long scard(String cacheName, Object key) {

        return redis().scard(buildKey(cacheName, key));
    }

    @Override
    public Long sadd(String cacheName, Object key, Object... members) {

        return redis().sadd(buildKey(cacheName, key), members);
    }

    @Override
    public void saddAndExpire(String cacheName, Object key, Object value, int seconds) {
//        redis().sadd(buildKey(cacheName, key), value);
//        redis().expire(buildKey(cacheName, key), seconds);
        sadd(cacheName, key, value);
        expire(cacheName, key, seconds);
    }
}
